热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

血缘|表里_SparkSQL字段血缘扩展实践!

篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkSQL字段血缘扩展实践!相关的知识,希望对你有一定的参考价值。目录

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Spark SQL字段血缘扩展实践!相关的知识,希望对你有一定的参考价值。



目录


  • 一、背景
  • 二、前期调研
  • 三、Spark SQL 扩展
    • 3.1 Spark 可扩展的内容
    • 3.2 实现自己的扩展
    • 3.3 扩展的规则类
    • 3.4 具体的实现方法

  • 四、总结


一、背景

字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?

有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。

平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。


二、前期调研

开发前我们做了很多相关调研,从中得知 Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展。

该方案可行,且对 Spark 的源码没有改动,代价也比较小,确定使用该方案。


三、Spark SQL 扩展

3.1 Spark 可扩展的内容

SparkSessionExtensions是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:

【Analyzer Rules】逻辑计划分析规则

【Check Analysis Rules】逻辑计划检查规则

【Optimizer Rules.】 逻辑计划优化规则

【Planning Strategies】形成物理计划的策略

【Customized Parser】自定义的sql解析器

【(External) Catalog listeners catalog】监听器

在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。

而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。


3.2 实现自己的扩展

class ExtralSparkExtension extends (SparkSessionExtensions => Unit)
override def apply(spark: SparkSessionExtensions): Unit =
//字段血缘
spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql解析器
spark.injectParser case (_, parser) => new ExtraSparkParser(parser)


上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到 SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。要让 ExtralSparkExtension 起到作用的话我们需要在spark-default.conf下配置

spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension

在启动 Spark 任务的时候即可生效。

注意到我们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含insert的时候就将 SQLText(SQL语句)设置到一个为FIELD_LINE_AGE_SQL,之所以将SQLText放到FIELD_LINE_AGE_SQL里面。因为在 DheckRule 里面是拿不到SparkPlan的我们需要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特别简单,重要的在另一个线程实现里面。

这里我们只关注了insert语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging
override def parsePlan(sqlText: String): LogicalPlan =
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert"))
if(lineAgeEnabled)
if(FIELD_LINE_AGE_SQL_COULD_SET.get())
//线程本地变量在这里
FIELD_LINE_AGE_SQL.set(sqlText)

FIELD_LINE_AGE_SQL_COULD_SET.remove()


delegate.parsePlan(sqlText)

//调用原始的sqlparser
override def parseExpression(sqlText: String): Expression =
delegate.parseExpression(sqlText)

//调用原始的sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)

//调用原始的sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)

//调用原始的sqlparser
override def parseTableSchema(sqlText: String): StructType =
delegate.parseTableSchema(sqlText)

//调用原始的sqlparser
override def parseDataType(sqlText: String): DataType =
delegate.parseDataType(sqlText)



3.3 扩展的规则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit )
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit =
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null)
//这里我们拿到sql然后启动一个线程做剩余的解析任务
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)



很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在

FieldLineageRunnableV3。


3.4 具体的实现方法

3.4.1 得到 SparkPlan

我们在 run 方法中得到 SparkPlan:

override def run(): Unit =
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan)
//得到sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null)
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............

为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。

在这里补充一下 Spark SQL 解析的过程如下:

经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。

我们看一个逻辑计划与物理计划对比的例子:
一个 SQL 语句:


select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

逻辑计划是这样的:

物理计划是这样的:

显然简化了很多。
得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。
我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。
这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。
想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

那么我们迭代查询的结果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

注意到有该变量

val levelProject = new ArrayBuffer

ArrayBuffer[NameExpressionHolder],通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。
例子及效果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.id

效果:


"edges": [

"sources": [
3
],
"targets": [
0
],
"expression": "id",
"edgeType": "PROJECTION"
,

"sources": [
4,
7
],
"targets": [
1
],
"expression": "name",
"edgeType": "PROJECTION"
,

"sources": [
5
],
"targets": [
2
],
"expression": "age",
"edgeType": "PROJECTION"
,

"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
,

"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
"edgeType": "PREDICATE"
,

"sources": [
3
],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
"edgeType": "PREDICATE"

],
"vertices": [

"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
,

"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
,

"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
,

"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
,

"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
,

"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
,

"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
,

"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"

]

四、总结

在 Spark SQL 的字段血缘实现中,我们通过其自扩展,首先拿到了 insert 语句,在我们自己的检查规则中拿到

SQL 语句,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到了物理计划。

我们通过迭代物理计划,根据不同执行计划做对应的转换,然后就得到了字段之间的对应关系。当前的实现是比较简单的,字段之间是直线的对应关系,中间过程被忽略,如果想实现字段的转换的整个过程也是没有问题的。


推荐阅读
  • 本文介绍了在go语言中利用(*interface{})(nil)传递参数类型的原理及应用。通过分析Martini框架中的injector类型的声明,解释了values映射表的作用以及parent Injector的含义。同时,讨论了该技术在实际开发中的应用场景。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 有没有一种方法可以在不继承UIAlertController的子类或不涉及UIAlertActions的情况下 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
author-avatar
liqiqinai
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有